{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# TensorFlowOnSpark with Spark ML Pipelines"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "[Spark ML Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html) provide high-level APIs (inspired by [scikit-learn](http://scikit-learn.org)) for Spark-based machine learning algorithms.\n",
    "\n",
    "This notebook demonstrates support for these APIs within TensorFlowOnSpark via the introduction of a new [pipeline](https://github.com/yahoo/TensorFlowOnSpark/blob/master/tensorflowonspark/pipeline.py) module consisting of two main classes: \n",
    "\n",
    "1. [TFEstimator](https://yahoo.github.io/TensorFlowOnSpark/tensorflowonspark.pipeline.html#tensorflowonspark.pipeline.TFEstimator) - A Spark ML Estimator which wraps a distributed TensorFlowOnSpark cluster for training.\n",
    "2. [TFModel](https://yahoo.github.io/TensorFlowOnSpark/tensorflowonspark.pipeline.html#tensorflowonspark.pipeline.TFModel) - A Spark ML Model which represents a TensorFlow model checkpoint or [saved_model](https://www.tensorflow.org/programmers_guide/saved_model#apis_to_build_and_load_a_savedmodel) on disk.  **Note**: due to architectural limitations, transform/inferencing is conducted on the executors as parallel instances of a single-node TensorFlow application (vs. a distributed TensorFlow cluster), so the model must fit in the memory of a single executor.\n",
    "\n",
    "In addition, there is a new [dfutil](https://yahoo.github.io/TensorFlowOnSpark/tensorflowonspark.dfutil.html) module which provides helper functions to convert from TensorFlow TFRecords to Spark DataFrames and vice versa.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Start a Spark Standalone Cluster\n",
    "\n",
    "First, in a terminal/shell window, start a single-machine Spark Standalone Cluster with three workers:\n",
    "```\n",
    "export MASTER=spark://$(hostname):7077\n",
    "export SPARK_WORKER_INSTANCES=3\n",
    "export CORES_PER_WORKER=1\n",
    "export TOTAL_CORES=$((${CORES_PER_WORKER}*${SPARK_WORKER_INSTANCES})) \n",
    "${SPARK_HOME}/sbin/start-master.sh; ${SPARK_HOME}/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G ${MASTER}\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Launch the Spark Jupyter Notebook\n",
    "\n",
    "Now, in the same window, launch a Pyspark Jupyter notebook:\n",
    "```\n",
    "cd ${TFoS_HOME}/examples/mnist\n",
    "PYSPARK_DRIVER_PYTHON=\"jupyter\" \\\n",
    "PYSPARK_DRIVER_PYTHON_OPTS=\"notebook --ip=`hostname`\" \\\n",
    "pyspark  --master ${MASTER} \\\n",
    "--conf spark.cores.max=${TOTAL_CORES} \\\n",
    "--conf spark.task.cpus=${CORES_PER_WORKER} \\\n",
    "--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist_pipeline.py \\\n",
    "--conf spark.executorEnv.JAVA_HOME=\"$JAVA_HOME\"\n",
    "```\n",
    "\n",
    "This should open a Jupyter browser pointing to the directory where this notebook is hosted.\n",
    "Click on the TFOS_pipeline.ipynb file, and begin executing the steps of the notebook."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from __future__ import absolute_import\n",
    "from __future__ import division\n",
    "from __future__ import print_function\n",
    "\n",
    "from pyspark.conf import SparkConf\n",
    "from pyspark.context import SparkContext\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "import argparse\n",
    "import os\n",
    "import subprocess\n",
    "import sys\n",
    "from datetime import datetime\n",
    "\n",
    "import tensorflow as tf\n",
    "from tensorflowonspark import dfutil\n",
    "from tensorflowonspark.pipeline import TFEstimator, TFModel\n",
    "\n",
    "import mnist_dist_pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# configure to match your cluster\n",
    "num_executors = 3"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Note**: for a Spark Standalone cluster on a single machine, the executors will operate from different working directories, so relative paths won't work across the cluster.  This code just maps relative paths to the absolute path of this notebook's current working directory."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cwd = os.getcwd()\n",
    "model_dir = os.sep.join([cwd, \"mnist_model\"])       # path to TensorFlow model/checkpoint\n",
    "export_dir = os.sep.join([cwd, \"mnist_export\"])     # path to TensorFlow saved_model export\n",
    "output = os.sep.join([cwd, \"predictions\"])          # path to output of inferencing\n",
    "\n",
    "print(model_dir)\n",
    "print(export_dir)\n",
    "print(output)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Clean up any prior artifacts\n",
    "subprocess.call([\"rm\", \"-rf\", model_dir])\n",
    "subprocess.call([\"rm\", \"-rf\", export_dir])\n",
    "subprocess.call([\"rm\", \"-rf\", output])\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Set up parser for command-line options\n",
    "parser = argparse.ArgumentParser()\n",
    "\n",
    "## TFoS/cluster\n",
    "parser.add_argument(\"--batch_size\", help=\"number of records per batch\", type=int, default=100)\n",
    "parser.add_argument(\"--epochs\", help=\"number of epochs\", type=int, default=1)\n",
    "parser.add_argument(\"--model_dir\", help=\"HDFS path to save/load model during train/inference\", type=str)\n",
    "parser.add_argument(\"--export_dir\", help=\"HDFS path to export saved_model\", type=str)\n",
    "parser.add_argument(\"--cluster_size\", help=\"number of nodes in the cluster\", type=int, default=num_executors)\n",
    "parser.add_argument(\"--num_ps\", help=\"number of PS nodes in cluster\", type=int, default=1)\n",
    "parser.add_argument(\"--protocol\", help=\"Tensorflow network protocol (grpc|rdma)\", default=\"grpc\")\n",
    "parser.add_argument(\"--steps\", help=\"maximum number of steps\", type=int, default=1000)\n",
    "parser.add_argument(\"--tensorboard\", help=\"launch tensorboard process\", action=\"store_true\")\n",
    "\n",
    "# Spark input/output\n",
    "parser.add_argument(\"--format\", help=\"example format: (csv|tfr)\", choices=[\"csv\",\"tfr\"], default=\"csv\")\n",
    "parser.add_argument(\"--images\", help=\"HDFS path to MNIST images in parallelized format\")\n",
    "parser.add_argument(\"--labels\", help=\"HDFS path to MNIST labels in parallelized format\")\n",
    "parser.add_argument(\"--output\", help=\"HDFS path to save test/inference output\", default=\"predictions\")\n",
    "\n",
    "# Execution Modes\n",
    "parser.add_argument(\"--train\", help=\"train a model using Estimator\", action=\"store_true\")\n",
    "parser.add_argument(\"--inference_mode\", help=\"type of inferencing (none|checkpoint|signature|direct)\", choices=[\"none\",\"signature\",\"direct\",\"checkpoint\"], default=\"none\")\n",
    "parser.add_argument(\"--inference_output\", help=\"output type for inferencing (predictions|features)\", choices=[\"predictions\",\"features\"], default=\"predictions\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Declare arguments for this session\n",
    "args = parser.parse_args([\"--model_dir\", model_dir, \\\n",
    "                          \"--export_dir\", export_dir, \\\n",
    "                          \"--output\", output, \\\n",
    "                          \"--images\", \"csv/train/images\", \\\n",
    "                          \"--labels\", \"csv/train/labels\", \\\n",
    "                          \"--train\", \\\n",
    "                          \"--inference_mode\", \"checkpoint\", \\\n",
    "                          \"--inference_output\", \"predictions\"])\n",
    "print(args)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following code supports reading the MNIST data as both TFRecords and CSV files.  It is assumed that you've already converted the MNIST binary data to either of these formats.  If not, you can refer to the [Spark Standalone example](https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_Standalone#convert-the-mnist-zip-files-using-spark) for instructions.\n",
    "\n",
    "For TFRecords, this leverages the `dfutil` module to load the TFRecords at `args.images` as a Spark DataFrame.  This conversion assumes a flat TFRecord structure, i.e. a simple list of features consisting of standard types, that can be easily mapped to DataFrame columns.  Deeply nested structures and variable schemas are not currently supported, so for those datasets, you may need to write a custom loader/converter.\n",
    "\n",
    "For CSV, this just uses traditional Spark RDD APIs to read/transform the text files, zip the images with the labels, and then convert the resulting RDD into a DataFrame.  Note: this uses a trivial CSV parser to keep the code simple."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if args.format == \"tfr\":\n",
    "    df = dfutil.loadTFRecords(sc, args.images)\n",
    "elif args.format == \"csv\":\n",
    "    images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])\n",
    "    labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])\n",
    "    dataRDD = images.zip(labels)\n",
    "    df = spark.createDataFrame(dataRDD, ['image', 'label'])\n",
    "else:\n",
    "    raise Exception(\"Unsupported format: {}\".format(args.format))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, train the model using a `TFEstimator`.  This class supports ML Params for arguments and hyper-parameters that are common across TensorFlow applications.  The constructor accepts the TensorFlow \"map_fun\" (i.e. the \"main\" function converted to the expected TensorFlowOnSpark API signature) along with an optional dictionary of application-specific hyper-parameters.  Note: this shows application-specific hyper-parameters for the Inception network only as an example, since the MNIST network does not have specific hyper-parameters.\n",
    "\n",
    "When `TFEstimator.fit()` is invoked, it will launch a TensorFlowOnSpark cluster for distributed training, with the model checkpoint persisted on disk.  If an `--export_dir` is supplied above, this TensorFlow application will also export a saved_model to that directory.  At the end of training, the TensorFlowOnSpark cluster will be automatically shut down.\n",
    "\n",
    "If the `--train` argument is not supplied above, this code will skip training and just construct a `TFModel` instance using the same arguments to represent a model checkpoint and/or saved_model already available on disk."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if args.train:\n",
    "    # train a model using Spark Estimator fitted to a DataFrame\n",
    "    # dummy tf args (from imagenet/inception example)\n",
    "    tf_args = { 'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94 }\n",
    "    estimator = TFEstimator(mnist_dist_pipeline.map_fun, tf_args) \\\n",
    "          .setInputMapping({'image':'image', 'label':'label'}) \\\n",
    "          .setModelDir(args.model_dir) \\\n",
    "          .setExportDir(args.export_dir) \\\n",
    "          .setClusterSize(args.cluster_size) \\\n",
    "          .setNumPS(args.num_ps) \\\n",
    "          .setProtocol(args.protocol) \\\n",
    "          .setTensorboard(args.tensorboard) \\\n",
    "          .setEpochs(args.epochs) \\\n",
    "          .setBatchSize(args.batch_size) \\\n",
    "          .setSteps(args.steps)\n",
    "    model = estimator.fit(df)\n",
    "else:\n",
    "    # use a previously trained/exported model\n",
    "    model = TFModel(args) \\\n",
    "        .setExportDir(args.export_dir) \\\n",
    "        .setBatchSize(args.batch_size)\n",
    "        "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Check the model checkpoint\n",
    "print(subprocess.check_output([\"ls\", \"-l\", model_dir]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Check the exported saved_model\n",
    "print(subprocess.check_output([\"ls\", \"-lR\", export_dir]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "At this point, the model checkpoint and the exported saved_model are persisted on disk.  The following code demonstrates several different ways to load and use these models for inferencing.  The different modes, described below, are specified by the `--inference_mode` command-line argument:\n",
    "\n",
    "1. **none** - skip inferencing entirely (useful when debugging training step).\n",
    "2. **checkpoint** - load the model directly from the TensorFlow checkpoint, and map the DataFrame columns to specific tensors.\n",
    "3. **signature** - load the model from the saved_model export and use the exported input/output signatures.  Note: these signatures provide a level of indirection between the signature's tensor \"aliases\" and the actual tensors.  This is intended to provide stable gRPC signatures for TensorFlow-Serving calls, even when the underlying model changes.  In this mode, the DataFrame columns are mapped to these provided \"aliases\".\n",
    "4. **direct** - load the model from the saved_model export, but ignore the exported signatures and tensor \"aliases\".  In this mode, DataFrame columns are mapped directly to the underlying tensors.  This can be useful if the user has a previously trained/exported saved_model but wants to access tensors that weren't originally mapped to a published signature."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NO INFERENCING\n",
    "if args.inference_mode == 'none':\n",
    "    sys.exit(0)\n",
    "    \n",
    "# INFER FROM TENSORFLOW CHECKPOINT\n",
    "elif args.inference_mode == 'checkpoint':\n",
    "    model.setModelDir(args.model_dir)                         # load model from checkpoint at args.model_dir\n",
    "    model.setExportDir(None)                                  # don't use a saved_model\n",
    "    model.setInputMapping({'image':'x'})                      # map DataFrame 'image' column to the 'x' input tensor\n",
    "    if args.inference_output == 'predictions':\n",
    "        model.setOutputMapping({'prediction':'col_out'})      # map 'prediction' output tensor to output DataFrame 'col_out' column\n",
    "    else:  # args.inference_output == 'features':\n",
    "        model.setOutputMapping({'prediction':'col_out', 'Relu':'col_out2'})   # add 'Relu' output tensor to output DataFrame 'col_out2' column\n",
    "\n",
    "# INFER USING TENSORFLOW SAVED_MODEL WITH EXPORTED SIGNATURES\n",
    "elif args.inference_mode == 'signature':\n",
    "    model.setModelDir(None)                                   # don't use the model checkpoint\n",
    "    model.setExportDir(args.export_dir)                       # load saved_model from args.export_dir\n",
    "    model.setTagSet(tf.saved_model.tag_constants.SERVING)     # using default SERVING tagset\n",
    "    model.setInputMapping({'image':'image'})                  # map DataFrame 'image' column to the 'image' input tensor alias of signature\n",
    "    if args.inference_output == 'predictions':\n",
    "        model.setSignatureDefKey(tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY)   # default signature def key, i.e. 'predict'\n",
    "        model.setOutputMapping({'prediction':'col_out'})      # map 'prediction' output tensor alias to output DataFrame 'col_out' column\n",
    "    else:  # args.inference_output == 'features'\n",
    "        model.setSignatureDefKey('featurize')                 # custom signature def key\n",
    "        model.setOutputMapping({'features':'col_out'})        # map 'features' output tensor alias to output DataFrame 'col_out' column\n",
    "\n",
    "# INFER USING TENSORFLOW SAVED_MODEL, IGNORING EXPORTED SIGNATURES\n",
    "else:  # args.inference_mode == 'direct':\n",
    "    model.setModelDir(None)                                   # don't use the model checkpoint\n",
    "    model.setExportDir(args.export_dir)                       # load saved_model from args.export_dir\n",
    "    model.setTagSet(tf.saved_model.tag_constants.SERVING)     # using default SERVING tagset\n",
    "    model.setInputMapping({'image':'x'})                      # map DataFrame 'image' column to the 'x' input tensor\n",
    "    if args.inference_output == 'predictions':\n",
    "        model.setOutputMapping({'prediction': 'col_out'})     # map 'prediction' output tensor to output DataFrame 'col_out' column\n",
    "    else:  # args.inference_output == 'features'\n",
    "        model.setOutputMapping({'prediction': 'col_out', 'Relu': 'col_out2'})   # add 'Relu' output tensor to output DataFrame 'col_out2' column\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, invoke the `TFModel.transform()` method and save the output DataFrame.  **Note**: Spark \"transformations\" are \"lazy\" by design, so no actual inferencing will occur until an \"action\" is invoked on the output DataFrame `preds`, which in this case is the `write.json` call below to save the output to disk."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"{0} ===== Model.transform()\".format(datetime.now().isoformat()))\n",
    "preds = model.transform(df)\n",
    "preds.write.json(args.output)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(subprocess.check_output([\"ls\", \"-l\", output]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Shutdown\n",
    "\n",
    "In your terminal/shell window, you can type `<ctrl-C>` to exit the Notebook server.\n",
    "\n",
    "Then, stop the Standalone Cluster via:\n",
    "```\n",
    "${SPARK_HOME}/sbin/stop-slave.sh; ${SPARK_HOME}/sbin/stop-master.sh\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
